모형 최적화 분산 처리

ipyparallel

  • Engine <-> Client
    • Engine: 실제 계산이 실행되는 프로세스
    • Client: 엔진을 제어하기 위한 인터페이스
$ conda install ipyparallel

Engine 가동/중지

  • 가동

    $ ipcluster start -n 4
  • 중지

    • Control-C
  • 코어가 1개당 워커는 1개로 지정. 8개라고 해서 8개 다 돌리면 안 된다.
  • 프론트엔드를 가진 프로그램이 있고 가지지 않은 프로그램이 있다.
  • 코어가 1개면 무의미. 코어가 여러개 있을 경우에 Ipython Clusters에서 engines을 여러개로 동시에 돌릴 수 있다.
  • 프론트엔드? 일반적으로 프론트엔드와 벡엔드라는 용어는 프로세스의 처음과 마지막 단계를 가리킨다. 프론트엔드는 사용자로부터 다양한 형태의 입력을 받아 벡엔드가 사용할 수 있는 규격을 따라 처리할 책임을 진다. 프론트엔드 개발자는 사용자가 접하게 되는 외적 부분의 구성 및 기능 구현에 중점을 두는 개발자, 백엔드개발자는 DB나 서버처럼 사용자가 접하지 않는 부분의 내부 지원을 담당하는 개발자
  • 원래 크롤링은 동시에 수백개를 돌린다는 의미다. 크롤링 할 수 있는 것은 scrapy가 파이썬에서 유일하다.
  • 그런데 이거 왜 안 배웠지? scrapy 설정을 잘 해야 한다. 안 그러면 디도스로 판단되어 막혀버린다.
  • 중간에 어디다가 저장하는 것이 파싱? LXML? BS는 너무 느려서 사실 잘 안 쓰인다. 스크래피는 자체 파서가 있다. 그래서 그걸 쓰면 된다.
  • XPath? [@id="comment_wrapper"]/div[2]/div/div[2] 이런식으로 복사해서 쓰면 된다.
  • CSS selecter 문법과 유사하다.
  • robots.txt에서 크롤링해도 된다. 안된다가 나온다. 법적으로

Client


In [1]:
from ipyparallel import Client
c = Client()
c.ids


Out[1]:
[0, 1, 2, 3]

In [2]:
dview = c[:]
dview


Out[2]:
<DirectView [0, 1, 2, 3]>

Map / Reduce

빅데이터 할 때 듣는 용어. 분산처리 할 때

  • map(function, data): data 각각에 function을 실행하여 결과 출력. data를 리스트로 넣는다.
  • reduce(function, data): function을 실행할 때 마다 결과의 수가 감소. 최종적으로 하나의 수가 남는다. 대표적인 것이 counting(예를 들어 뉴스그룹. 몇 십 년치 데이터면 컴퓨터 몇 십대인데 이럴 경우에)

In [3]:
def fahrenheit(T):
    return 9 / 5 * T + 32

temp = np.arange(0, 110, 10)
temp


Out[3]:
array([  0,  10,  20,  30,  40,  50,  60,  70,  80,  90, 100])

In [15]:
F = map(fahrenheit, temp)
F, list(F)


Out[15]:
(<map at 0xb0becf8>,
 [32.0, 50.0, 68.0, 86.0, 104.0, 122.0, 140.0, 158.0, 176.0, 194.0, 212.0])

In [6]:
def create_prime(primes, n):
    for p in primes:
        if n % p == 0:
            return primes
    primes.append(n)
    return primes

In [8]:
from functools import reduce

In [11]:
reduce(create_prime, np.arange(2, 100), [2])


Out[11]:
[2,
 3,
 5,
 7,
 11,
 13,
 17,
 19,
 23,
 29,
 31,
 37,
 41,
 43,
 47,
 53,
 59,
 61,
 67,
 71,
 73,
 79,
 83,
 89,
 97]

Parallel Map

  • map/reduce 연산을 engine process들에게 맡겨서 동시 실행

In [16]:
def pyprimes(kmax):   #의미 생각하지 말고 소수 구하는 복잡한 함수다 정도만 알아두어라
    p = np.zeros(1000)
    result = []
    if kmax > 1000:
        kmax = 1000
    k = 0
    n = 2
    while k < kmax:
        i = 0
        while i < k and n % p[i] != 0:
            i = i + 1
        if i == k:
            p[k] = n
            k = k + 1
            result.append(n)
        n = n + 1
    return result

In [19]:
%time result = map(pyprimes, range(700, 1000))   #도커 안이라서 아래와 이것과 시간이 같게 나올 것이다. 아래 거는 서버에서 돌리면 다를듯


Wall time: 0 ns

In [20]:
%time parallel_result = dview.map_sync(pyprimes, range(700, 1000))   #6명 중 1명이라도 답을 안준다면 안 주고 다 끝나고 나서 끝이다.


Wall time: 25.8 s

In [21]:
parallel_result == result


Out[21]:
False

In [22]:
async_result = dview.map_async(pyprimes, range(700, 1000))   #안 끝나도 중간에 제어권 돌려주고 모니터링 알아서 해라.

In [25]:
async_result.progress   #몇 명이 완성했는지 알려준다.


Out[25]:
4

In [24]:
async_result.get()[0][-10:]


Out[24]:
[5189, 5197, 5209, 5227, 5231, 5233, 5237, 5261, 5273, 5279]

모형 저장

모형을 분산처리하기 위해서는 sklearn.externals 서브패키지의 joblib.dump 명령과 joblib.load 명령 사용

pikle형태로 지금의 모델 안에 어트리뷰트 가진 형태대로 세이브 하고 긁어오고 한다.

ipyparalle 을 사용한 분산 모형 최적화


In [26]:
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline

In [28]:
news = fetch_20newsgroups(subset="all")
n_samples = 3000
X_train = news.data[:n_samples]
y_train = news.target[:n_samples]

model = Pipeline([
        ('vect', TfidfVectorizer(stop_words="english", token_pattern="\b[a-z0-9_\-\.]+[a-z][a-z0-9_\-\.]+\b")),
        ('svc', SVC()),
    ])

In [29]:
from sklearn.externals import joblib
import os
from sklearn.cross_validation import KFold, cross_val_score

In [30]:
def persist_cv_splits(X, y, K=3, name="data", suffix="_cv_%03d.pkl"):    #데이터를 나눈다. 나눠서 저장한다.
    cv_split_filenames = []
    cv = KFold(n_samples, K, shuffle=True, random_state=0)
    for i, (train, test) in enumerate(cv):
        cv_fold = ([X[k] for k in train], y[train],     
                   [X[k] for k in test], y[test])
        cv_split_filename = name + suffix % i
        cv_split_filename = os.path.abspath(cv_split_filename)
        joblib.dump(cv_fold, cv_split_filename)
        cv_split_filenames.append(cv_split_filename)
    return cv_split_filenames

cv_filenames = persist_cv_splits(X_train, y_train, name="news")
cv_filenames


Out[30]:
['C:\\Users\\Administrator\\@수학\\160704월_24일차_모형 최적화 Model optimization\\news_cv_000.pkl',
 'C:\\Users\\Administrator\\@수학\\160704월_24일차_모형 최적화 Model optimization\\news_cv_001.pkl',
 'C:\\Users\\Administrator\\@수학\\160704월_24일차_모형 최적화 Model optimization\\news_cv_002.pkl']

In [31]:
def compute_evaluation(cv_split_filename, model, params):
    from sklearn.externals import joblib
    X_train_, y_train_, X_test_, y_test_ = joblib.load(cv_split_filename, mmap_mode="c")
    model.set_params(**params)
    model.fit(X_train_, y_train_)
    test_scores = model.score(X_test_, y_test_)
    return test_scores

In [32]:
from sklearn.grid_search import ParameterGrid
def parallel_grid_search(lb_view, model, cv_split_filenames, param_grid):   #lb_view 엔진에 대한 view. 
    all_tasks = []
    all_parameters = list(ParameterGrid(param_grid))
    for i, params in enumerate(all_parameters):
        task_for_params = []
        for j, cv_split_filename in enumerate(cv_split_filenames):
            t = lb_view.apply(compute_evaluation, cv_split_filename, model, params)  #map이랑 유사. apply는 하나짜리 함수 실행. 여기 말고 엔진에 가서 실행
            task_for_params.append(t)
        all_tasks.append(task_for_params)
    return all_parameters, all_tasks

In [33]:
import datetime
def print_progress(tasks):
    progress = np.mean([task.ready() for task_group in tasks for task in task_group])
    print("{0}:{1}%".format(datetime.datetime.now(), progress * 100.0))
    return int(progress * 100.0)

In [34]:
from ipyparallel import Client
client = Client()
print(client.ids)
lb_view = client.load_balanced_view()


[0, 1, 2, 3]

In [35]:
from sklearn.grid_search import GridSearchCV
parameters = {
    "svc__gamma": np.logspace(-2, 1, 4),
    "svc__C": np.logspace(-1, 1, 3),
}

In [36]:
all_parameters, all_tasks = parallel_grid_search(lb_view, model, cv_filenames, parameters)

In [37]:
import time
start_time = datetime.datetime.now()
while True:
    progress = print_progress(all_tasks)
    if progress >= 100:
        break
    time.sleep(1)
print("finish")
end_time = datetime.datetime.now()
print((end_time - start_time).total_seconds())


2016-09-19 09:27:36.316694:0.0%
2016-09-19 09:27:37.320708:52.77777777777778%
2016-09-19 09:27:38.323524:100.0%
finish
2.00683

In [ ]: